package org.databandtech.job.jobs;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Statement;
import org.databandtech.job.entity.SavableTaskJob;
import org.databandtech.job.entity.ScheduledTaskJob;
import org.databandtech.job.entity.tuple.Tuple3;
import org.databandtech.job.sink.HiveSqlQueryJob1MySQLSink;
import org.databandtech.job.sink.SinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * hive执行查询任务，需要返回数据集，并对结果存储
 *
 */
public class HiveSqlQueryJob implements ScheduledTaskJob, SavableTaskJob<ResultSet> {

	public HiveSqlQueryJob(String key, String cron, String conStr, String sql, SinkFunction sink) {
		super();
		this.key = key;
		this.cron = cron;
		this.conStr = conStr;
		this.sql = sql;
		this.sink = sink;
	}

	private String key;
	private String cron;
	private String conStr;// "jdbc:hive2://192.168.13.200:10000/default";
	private String sql;// "select * from"
	private SinkFunction sink; // 保存结果的元数据描述
	private static final Logger LOGGER = LoggerFactory.getLogger(HiveSqlQueryJob.class);

	public String getConStr() {
		return conStr;
	}

	public void setConStr(String conStr) {
		this.conStr = conStr;
	}

	public String getSql() {
		return sql;
	}

	public void setSql(String sql) {
		this.sql = sql;
	}

	public String getKey() {
		return key;
	}

	public void setKey(String key) {
		this.key = key;
	}

	public String getCron() {
		return cron;
	}

	public void setCron(String cron) {
		this.cron = cron;
	}

	public SinkFunction getsink() {
		return sink;
	}

	public void setsink(SinkFunction sink) {
		this.sink = sink;
	}

	@Override
	public void run() {
		Connection con = null;
		try {

			Class.forName("org.apache.hive.jdbc.HiveDriver");
			con = DriverManager.getConnection(conStr, "", "");
			Statement stmt = con.createStatement();
			ResultSet rs = stmt.executeQuery(sql);
			SaveData(rs, sink);// 对结果数据进行sink存储

		} catch (Exception ex) {
			ex.printStackTrace();
		} finally {
			try {
				if (con != null)
					con.close();
			} catch (Exception ex) {
			}
		}
	}

	@Override
	public String SaveData(ResultSet rs, SinkFunction sink) {

		try {
			sink.open();
			ResultSetMetaData metaData = rs.getMetaData();
			int columnCount = metaData.getColumnCount();
			LOGGER.info("columnCount is => {}", columnCount);
			LOGGER.info("First columnLabel is => {},{}", metaData.getColumnLabel(1),metaData.getColumnTypeName(1));
			while (rs.next()) {
				Tuple3<Integer, String, String> item = new Tuple3(rs.getInt(1), rs.getString(2), rs.getString(3));
				sink.invoke(item);
			}
			LOGGER.info("CommandExecuteJob => {}  run  当前线程名称 {} ", key, Thread.currentThread().getName());
			return "ok";
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
			return e.getMessage();
		} finally {
			try {
				sink.close();
			} catch (Exception e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}

	}

}
